/*
Copyright 2025 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
	"fmt"
	"hash/fnv"
	"strconv"
	"strings"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/rand"
	"k8s.io/apimachinery/pkg/util/sets"

	"volcano.sh/apis/pkg/apis/scheduling"
)

type SubJobID types.UID

type SubJobInfo struct {
	UID SubJobID
	Job JobID

	MinAvailable int32
	Priority     int32 // determined by the highest priority task in the subJob
	MatchIndex   int   // the first label value match to the pods in the subJob

	Tasks           map[TaskID]*TaskInfo
	TaskStatusIndex map[TaskStatus]TasksMap
	taskPriorities  map[int32]sets.Set[TaskID]

	AllocatedHyperNode string

	networkTopology *scheduling.NetworkTopologySpec
}

func NewSubJobInfo(uid SubJobID, job JobID, policy *scheduling.SubGroupPolicySpec, matchValues []string) *SubJobInfo {
	sji := &SubJobInfo{
		UID:             uid,
		Job:             job,
		MinAvailable:    1,
		Tasks:           make(map[TaskID]*TaskInfo),
		TaskStatusIndex: make(map[TaskStatus]TasksMap),
		taskPriorities:  make(map[int32]sets.Set[TaskID]),
	}
	if policy != nil {
		if policy.SubGroupSize != nil {
			sji.MinAvailable = *policy.SubGroupSize
		}
		if policy.NetworkTopology != nil {
			sji.networkTopology = policy.NetworkTopology.DeepCopy()
		}
	}
	if len(matchValues) > 0 {
		if v, err := strconv.Atoi(matchValues[0]); err == nil {
			sji.MatchIndex = v
		}
	}
	return sji
}

// IsHardTopologyMode return whether the subJob's network topology mode is hard and also return the highest allowed tier
func (sji *SubJobInfo) IsHardTopologyMode() (bool, int) {
	if sji.networkTopology == nil || sji.networkTopology.HighestTierAllowed == nil {
		return false, 0
	}

	return sji.networkTopology.Mode == scheduling.HardNetworkTopologyMode, *sji.networkTopology.HighestTierAllowed
}

// IsSoftTopologyMode returns whether the subJob has configured network topologies with soft mode.
func (sji *SubJobInfo) IsSoftTopologyMode() bool {
	if sji.networkTopology == nil {
		return false
	}
	return sji.networkTopology.Mode == scheduling.SoftNetworkTopologyMode
}

// WithNetworkTopology returns whether the subJob has configured network topologies
func (sji *SubJobInfo) WithNetworkTopology() bool {
	return sji.networkTopology != nil
}

func (sji *SubJobInfo) addTask(ti *TaskInfo) {
	sji.Tasks[ti.UID] = ti

	if _, found := sji.TaskStatusIndex[ti.Status]; !found {
		sji.TaskStatusIndex[ti.Status] = TasksMap{}
	}
	sji.TaskStatusIndex[ti.Status][ti.UID] = ti

	if _, found := sji.taskPriorities[ti.Priority]; !found {
		sji.taskPriorities[ti.Priority] = sets.New[TaskID]()
	}
	sji.taskPriorities[ti.Priority].Insert(ti.UID)
	if ti.Priority > sji.Priority {
		sji.Priority = ti.Priority
	}
}

func (sji *SubJobInfo) deleteTask(ti *TaskInfo) {
	delete(sji.Tasks, ti.UID)

	if tasks, found := sji.TaskStatusIndex[ti.Status]; found {
		delete(tasks, ti.UID)
		if len(tasks) == 0 {
			delete(sji.TaskStatusIndex, ti.Status)
		}
	}

	if tasks, found := sji.taskPriorities[ti.Priority]; found {
		delete(tasks, ti.UID)
		if len(tasks) == 0 {
			delete(sji.taskPriorities, ti.Priority)
			if ti.Priority > sji.Priority {
				sji.Priority = sji.getTaskHighestPriority()
			}
		}
	}
}

func (sji *SubJobInfo) getTaskHighestPriority() int32 {
	var highestPriority int32
	for priority := range sji.taskPriorities {
		if priority > highestPriority {
			highestPriority = priority
		}
	}
	return highestPriority
}

func getSubJobMatchValues(policy scheduling.SubGroupPolicySpec, pod *v1.Pod) []string {
	if len(policy.MatchPolicy) == 0 || pod.Labels == nil {
		return nil
	}

	values := make([]string, 0, len(policy.MatchPolicy))
	for _, p := range policy.MatchPolicy {
		value, ok := pod.Labels[p.LabelKey]
		if !ok || value == "" {
			return nil
		}
		values = append(values, value)
	}
	return values
}

func getSubJobID(job JobID, policy string, matchValues []string) SubJobID {
	id := strings.Join(matchValues, "-")
	if len(id) > 128 {
		hasher := fnv.New32a()
		_, _ = hasher.Write([]byte(id))
		id = rand.SafeEncodeString(fmt.Sprint(hasher.Sum32())) // todo handle collision
	}
	return SubJobID(fmt.Sprintf("%s/%s-%s", job, policy, id))
}

func (sji *SubJobInfo) IsReady() bool {
	return sji.ReadyTaskNum()+sji.PendingBestEffortTaskNum() >= sji.MinAvailable
}

func (sji *SubJobInfo) IsPipelined() bool {
	return sji.WaitingTaskNum()+sji.ReadyTaskNum()+sji.PendingBestEffortTaskNum() >= sji.MinAvailable
}

// ReadyTaskNum returns the number of tasks that are ready or that is best-effort.
func (sji *SubJobInfo) ReadyTaskNum() int32 {
	occupied := 0
	occupied += len(sji.TaskStatusIndex[Bound])
	occupied += len(sji.TaskStatusIndex[Binding])
	occupied += len(sji.TaskStatusIndex[Running])
	occupied += len(sji.TaskStatusIndex[Allocated])
	occupied += len(sji.TaskStatusIndex[Succeeded])

	return int32(occupied)
}

func (sji *SubJobInfo) PendingBestEffortTaskNum() int32 {
	count := 0
	for _, task := range sji.TaskStatusIndex[Pending] {
		if task.BestEffort {
			count++
		}
	}
	return int32(count)
}

// WaitingTaskNum returns the number of tasks that are pipelined.
func (sji *SubJobInfo) WaitingTaskNum() int32 {
	return int32(len(sji.TaskStatusIndex[Pipelined]))
}

func (sji *SubJobInfo) AllocatedTaskNum() int32 {
	count := 0
	for status, tasks := range sji.TaskStatusIndex {
		if AllocatedStatus(status) {
			count += len(tasks)
		}
	}
	return int32(count)
}

func (sji *SubJobInfo) CloneStatusFrom(source *SubJobInfo) {
	sji.AllocatedHyperNode = source.AllocatedHyperNode
}
